/*
 * Copyright 2025 The RuleGo Authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package window

import (
	"context"
	"fmt"
	"strings"
	"sync"
	"time"

	"github.com/rulego/streamsql/types"
	"github.com/rulego/streamsql/utils/cast"
)

// Ensure SessionWindow struct implements Window interface
var _ Window = (*SessionWindow)(nil)

// SessionWindow represents a session window
// Session window is an event-time based window that closes when no events arrive for a period of time
type SessionWindow struct {
	// config is the window configuration information
	config types.WindowConfig
	// timeout is the session timeout duration, session will close if no new events within this time
	timeout time.Duration
	// mu is used to protect concurrent access to window data
	mu sync.RWMutex
	// sessionMap stores session data for different keys
	sessionMap map[string]*session
	// outputChan is a channel for sending data when window triggers
	outputChan chan []types.Row
	// callback is an optional callback function called when window triggers
	callback func([]types.Row)
	// ctx is used to control window lifecycle
	ctx context.Context
	// cancelFunc is used to cancel window operations
	cancelFunc context.CancelFunc
	// Channel for initializing window
	initChan    chan struct{}
	initialized bool
	// Lock to protect ticker
	tickerMu sync.Mutex
	ticker   *time.Ticker
	// watermark for event time processing (only used for EventTime)
	watermark *Watermark
	// triggeredSessions stores sessions that have been triggered but are still open for late data (for EventTime with allowedLateness)
	triggeredSessions map[string]*sessionInfo
}

// sessionInfo stores information about a triggered session that is still open for late data
type sessionInfo struct {
	session   *session
	closeTime time.Time // session end + allowedLateness
}

// session stores data and state for a session
type session struct {
	data       []types.Row
	lastActive time.Time
	slot       *types.TimeSlot
}

// NewSessionWindow creates a new session window instance
func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error) {
	// Get timeout parameter from params array
	if len(config.Params) == 0 {
		return nil, fmt.Errorf("session window requires 'timeout' parameter")
	}

	// Create a cancellable context
	ctx, cancel := context.WithCancel(context.Background())

	timeoutVal := config.Params[0]
	timeout, err := cast.ToDurationE(timeoutVal)
	if err != nil {
		cancel()
		return nil, fmt.Errorf("invalid timeout for session window: %v", err)
	}

	// Use unified performance configuration to get window output buffer size
	bufferSize := 100 // Default value, session windows typically have smaller buffers
	if (config.PerformanceConfig != types.PerformanceConfig{}) {
		bufferSize = config.PerformanceConfig.BufferConfig.WindowOutputSize / 10 // Session window uses 1/10 of buffer
		if bufferSize < 10 {
			bufferSize = 10 // Minimum value
		}
	}

	// Determine time characteristic (default to ProcessingTime for backward compatibility)
	timeChar := config.TimeCharacteristic
	if timeChar == "" {
		timeChar = types.ProcessingTime
	}

	// Initialize watermark for event time
	var watermark *Watermark
	if timeChar == types.EventTime {
		maxOutOfOrderness := config.MaxOutOfOrderness
		if maxOutOfOrderness == 0 {
			maxOutOfOrderness = 0 // Default: no out-of-orderness allowed
		}
		watermarkInterval := config.WatermarkInterval
		if watermarkInterval == 0 {
			watermarkInterval = 200 * time.Millisecond // Default: 200ms
		}
		idleTimeout := config.IdleTimeout
		// Default: 0 means disabled, no idle source mechanism
		watermark = NewWatermark(maxOutOfOrderness, watermarkInterval, idleTimeout)
	}

	return &SessionWindow{
		config:            config,
		timeout:           timeout,
		sessionMap:        make(map[string]*session),
		outputChan:        make(chan []types.Row, bufferSize),
		ctx:               ctx,
		cancelFunc:        cancel,
		initChan:          make(chan struct{}),
		initialized:       false,
		watermark:         watermark,
		triggeredSessions: make(map[string]*sessionInfo),
	}, nil
}

// Add adds data to session window
func (sw *SessionWindow) Add(data interface{}) {
	// Lock to ensure thread safety
	sw.mu.Lock()
	defer sw.mu.Unlock()

	if !sw.initialized {
		// Safely close initChan to avoid closing an already closed channel
		select {
		case <-sw.initChan:
			// Already closed, do nothing
		default:
			close(sw.initChan)
		}
		sw.initialized = true
	}

	// Get data timestamp
	timestamp := GetTimestamp(data, sw.config.TsProp, sw.config.TimeUnit)

	// Determine time characteristic (default to ProcessingTime for backward compatibility)
	timeChar := sw.config.TimeCharacteristic
	if timeChar == "" {
		timeChar = types.ProcessingTime
	}

	// For event time, update watermark and check for late data
	if timeChar == types.EventTime && sw.watermark != nil {
		sw.watermark.UpdateEventTime(timestamp)
		// Check if data is late and handle allowedLateness
		if sw.watermark.IsEventTimeLate(timestamp) {
			// Data is late, check if it's within allowedLateness
			allowedLateness := sw.config.AllowedLateness
			if allowedLateness > 0 {
				// Check if this late data belongs to any triggered session that's still open
				sw.handleLateData(timestamp, allowedLateness)
			}
			// If allowedLateness is 0 or data is too late, we still add it but it won't trigger updates
		}
	}

	// Create Row object
	row := types.Row{
		Data:      data,
		Timestamp: timestamp,
	}

	// Extract session key (supports multiple group by keys)
	key := extractSessionCompositeKey(data, sw.config.GroupByKeys)

	// Get or create session
	s, exists := sw.sessionMap[key]
	if !exists {
		// Create new session
		// Use the actual timestamp of the first data point as session start
		// No alignment needed - session starts from when first data arrives
		start := timestamp
		end := start.Add(sw.timeout)
		slot := types.NewTimeSlot(&start, &end)

		s = &session{
			data:       []types.Row{},
			lastActive: timestamp,
			slot:       slot,
		}
		sw.sessionMap[key] = s
	} else {
		// Update session end time
		if timestamp.After(s.lastActive) {
			s.lastActive = timestamp
			// Extend session end time
			newEnd := timestamp.Add(sw.timeout)
			if newEnd.After(*s.slot.End) {
				s.slot.End = &newEnd
			}
		}
	}

	// Add data to session
	row.Slot = s.slot
	s.data = append(s.data, row)
}

// Start starts the session window's periodic check mechanism
// Start starts the session window, begins periodic checking of expired sessions
// Uses lazy initialization mode to avoid infinite waiting when no data, while ensuring subsequent data can be processed normally
func (sw *SessionWindow) Start() {
	// Determine time characteristic (default to ProcessingTime for backward compatibility)
	timeChar := sw.config.TimeCharacteristic
	if timeChar == "" {
		timeChar = types.ProcessingTime
	}

	if timeChar == types.EventTime {
		// Event time: trigger based on watermark
		sw.startEventTime()
	} else {
		// Processing time: trigger based on system clock
		sw.startProcessingTime()
	}
}

// startProcessingTime starts the processing time trigger mechanism
func (sw *SessionWindow) startProcessingTime() {
	go func() {
		// Close output channel when function ends
		defer close(sw.outputChan)

		// Wait for initialization completion or context cancellation
		select {
		case <-sw.initChan:
			// Normal initialization completed, continue processing
		case <-sw.ctx.Done():
			// Context cancelled, exit directly
			return
		}

		// Periodically check expired sessions
		sw.tickerMu.Lock()
		sw.ticker = time.NewTicker(sw.timeout / 2)
		ticker := sw.ticker
		sw.tickerMu.Unlock()

		defer func() {
			sw.tickerMu.Lock()
			if sw.ticker != nil {
				sw.ticker.Stop()
			}
			sw.tickerMu.Unlock()
		}()

		for {
			select {
			case <-ticker.C:
				sw.checkExpiredSessions()
			case <-sw.ctx.Done():
				return
			}
		}
	}()
}

// startEventTime starts the event time trigger mechanism based on watermark
func (sw *SessionWindow) startEventTime() {
	go func() {
		// Close output channel when function ends
		defer close(sw.outputChan)
		if sw.watermark != nil {
			defer sw.watermark.Stop()
		}

		// Wait for initialization completion or context cancellation
		select {
		case <-sw.initChan:
			// Normal initialization completed, continue processing
		case <-sw.ctx.Done():
			// Context cancelled, exit directly
			return
		}

		// Process watermark updates
		if sw.watermark != nil {
			for {
				select {
				case watermarkTime := <-sw.watermark.WatermarkChan():
					sw.checkAndTriggerSessions(watermarkTime)
				case <-sw.ctx.Done():
					return
				}
			}
		} else {
			// If watermark is nil, just wait for context cancellation
			<-sw.ctx.Done()
			return
		}
	}()
}

// Stop stops session window operations
func (sw *SessionWindow) Stop() {
	// Call cancel function to stop window operations
	sw.cancelFunc()

	// Safely stop ticker
	sw.tickerMu.Lock()
	if sw.ticker != nil {
		sw.ticker.Stop()
	}
	sw.tickerMu.Unlock()

	// Stop watermark (for event time)
	if sw.watermark != nil {
		sw.watermark.Stop()
	}

	// Ensure initChan is closed if it hasn't been closed yet
	// This prevents Start() goroutine from blocking on initChan
	sw.mu.Lock()
	if !sw.initialized && sw.initChan != nil {
		select {
		case <-sw.initChan:
			// Already closed, do nothing
		default:
			close(sw.initChan)
		}
	}
	sw.mu.Unlock()
}

func (sw *SessionWindow) checkExpiredSessions() {
	sw.mu.Lock()
	now := time.Now()
	resultsToSend := sw.collectExpiredSessions(now)
	sw.mu.Unlock()

	sw.sendResults(resultsToSend)
}

func (sw *SessionWindow) checkAndTriggerSessions(watermarkTime time.Time) {
	sw.mu.Lock()
	resultsToSend := sw.collectExpiredSessions(watermarkTime)
	sw.closeExpiredSessions(watermarkTime)
	sw.mu.Unlock()

	sw.sendResults(resultsToSend)
}

func (sw *SessionWindow) collectExpiredSessions(currentTime time.Time) [][]types.Row {
	expiredKeys := []string{}
	for key, s := range sw.sessionMap {
		// For event time, use slot.End to determine if session expired
		// Session expires when watermark >= session end time
		// For processing time, use lastActive + timeout
		if s.slot.End != nil && !currentTime.Before(*s.slot.End) {
			expiredKeys = append(expiredKeys, key)
		} else if currentTime.Sub(s.lastActive) > sw.timeout {
			expiredKeys = append(expiredKeys, key)
		}
	}

	resultsToSend := make([][]types.Row, 0)
	allowedLateness := sw.config.AllowedLateness

	for _, key := range expiredKeys {
		s := sw.sessionMap[key]
		if len(s.data) > 0 {
			result := make([]types.Row, len(s.data))
			copy(result, s.data)
			resultsToSend = append(resultsToSend, result)

			if allowedLateness > 0 {
				closeTime := s.slot.End.Add(allowedLateness)
				sw.triggeredSessions[key] = &sessionInfo{
					session:   s,
					closeTime: closeTime,
				}
			}
		}
		delete(sw.sessionMap, key)
	}

	return resultsToSend
}

func (sw *SessionWindow) sendResults(resultsToSend [][]types.Row) {
	for _, result := range resultsToSend {
		// Skip empty results to avoid filling up channels
		if len(result) == 0 {
			continue
		}

		if sw.callback != nil {
			sw.callback(result)
		}

		select {
		case sw.outputChan <- result:
		default:
		}
	}
}

// Trigger manually triggers all session windows
func (sw *SessionWindow) Trigger() {
	sw.mu.Lock()

	// Collect all results first
	resultsToSend := make([][]types.Row, 0)
	for _, s := range sw.sessionMap {
		if len(s.data) > 0 {
			// Trigger session window
			result := make([]types.Row, len(s.data))
			copy(result, s.data)
			resultsToSend = append(resultsToSend, result)
		}
	}
	// Clear all sessions
	sw.sessionMap = make(map[string]*session)

	// Release lock before sending to channel and calling callback to avoid blocking
	sw.mu.Unlock()

	// Send results and call callbacks outside of lock to avoid blocking
	for _, result := range resultsToSend {
		// Skip empty results to avoid filling up channels
		if len(result) == 0 {
			continue
		}

		// If callback function is set, execute it
		if sw.callback != nil {
			sw.callback(result)
		}

		// Non-blocking send to output channel
		select {
		case sw.outputChan <- result:
			// Successfully sent
		default:
			// Channel full, drop result (could add statistics here if needed)
		}
	}
}

// Reset resets session window data
func (sw *SessionWindow) Reset() {
	sw.mu.Lock()
	defer sw.mu.Unlock()

	// Stop existing ticker
	sw.tickerMu.Lock()
	if sw.ticker != nil {
		sw.ticker.Stop()
		sw.ticker = nil
	}
	sw.tickerMu.Unlock()

	// Stop watermark (for event time)
	if sw.watermark != nil {
		sw.watermark.Stop()
		// Recreate watermark
		timeChar := sw.config.TimeCharacteristic
		if timeChar == "" {
			timeChar = types.ProcessingTime
		}
		if timeChar == types.EventTime {
			maxOutOfOrderness := sw.config.MaxOutOfOrderness
			if maxOutOfOrderness == 0 {
				maxOutOfOrderness = 0
			}
			watermarkInterval := sw.config.WatermarkInterval
			if watermarkInterval == 0 {
				watermarkInterval = 200 * time.Millisecond
			}
			idleTimeout := sw.config.IdleTimeout
			sw.watermark = NewWatermark(maxOutOfOrderness, watermarkInterval, idleTimeout)
		}
	}

	// Clear session data
	sw.sessionMap = make(map[string]*session)
	sw.triggeredSessions = make(map[string]*sessionInfo)
	sw.initialized = false
	sw.initChan = make(chan struct{})
}

// OutputChan returns a read-only channel for receiving data when window triggers
func (sw *SessionWindow) OutputChan() <-chan []types.Row {
	return sw.outputChan
}

// SetCallback sets the callback function when session window triggers
func (sw *SessionWindow) SetCallback(callback func([]types.Row)) {
	sw.mu.Lock()
	defer sw.mu.Unlock()
	sw.callback = callback
}

// handleLateData handles late data that arrives within allowedLateness
func (sw *SessionWindow) handleLateData(eventTime time.Time, allowedLateness time.Duration) {
	sw.mu.Lock()
	defer sw.mu.Unlock()

	// Find which triggered session this late data belongs to
	for _, info := range sw.triggeredSessions {
		if info.session.slot.Contains(eventTime) {
			// This late data belongs to a triggered session that's still open
			// Trigger session again with updated data (late update)
			sw.triggerLateUpdateLocked(info.session)
			return
		}
	}
}

// triggerLateUpdateLocked triggers a late update for a session (must be called with lock held)
func (sw *SessionWindow) triggerLateUpdateLocked(s *session) {
	if len(s.data) == 0 {
		return
	}

	// Extract session data including late data
	resultData := make([]types.Row, len(s.data))
	copy(resultData, s.data)

	// Get callback reference before releasing lock
	callback := sw.callback

	// Release lock before calling callback and sending to channel to avoid blocking
	sw.mu.Unlock()

	if callback != nil {
		callback(resultData)
	}

	// Non-blocking send to output channel
	select {
	case sw.outputChan <- resultData:
		// Successfully sent
	default:
		// Channel full, drop result
	}

	// Re-acquire lock
	sw.mu.Lock()
}

// closeExpiredSessions closes sessions that have exceeded allowedLateness
func (sw *SessionWindow) closeExpiredSessions(watermarkTime time.Time) {
	for key, info := range sw.triggeredSessions {
		if !watermarkTime.Before(info.closeTime) {
			// Session has expired, remove it
			delete(sw.triggeredSessions, key)
		}
	}
}

// extractSessionCompositeKey builds composite session key from multiple group fields
// If GroupByKeys is empty, returns default key
func extractSessionCompositeKey(data interface{}, keys []string) string {
	if len(keys) == 0 {
		return "default"
	}
	parts := make([]string, 0, len(keys))
	if m, ok := data.(map[string]interface{}); ok {
		for _, k := range keys {
			parts = append(parts, fmt.Sprintf("%v", m[k]))
		}
		return strings.Join(parts, "|")
	}
	return "default"
}
